package com.zenitera.bigdata.source;

import com.zenitera.bigdata.bean.WaterSensor;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.Socket;
import java.nio.charset.StandardCharsets;

/**
 * 自定义Source
 */

public class Flink05_Source_Custom {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env
                .addSource(new MySource("wangt-flink03", 6666))
                .print();

        env.execute();
    }

    public static class MySource implements SourceFunction<WaterSensor> {
        private String host;
        private int port;
        private volatile boolean isRunning = true;
        private Socket socket;

        public MySource(String host, int port) {
            this.host = host;
            this.port = port;
        }


        @Override
        public void run(SourceContext<WaterSensor> ctx) throws Exception {
            socket = new Socket(host, port);
            BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8));
            String line = null;
            while (isRunning && (line = reader.readLine()) != null) {
                String[] split = line.split(",");
                ctx.collect(new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2])));
            }
        }

        @Override
        public void cancel() {
            isRunning = false;
            try {
                socket.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}


/*
[wangting@wangt-flink03 ~]$ nc -lk 6666
sensor_1,1607527992000,20
sensor_1,1607527993000,40
sensor_1,1607527994000,50
sensor_1,1607527994000,80
sensor_1,1607527994000,70

=================================
3> WaterSensor(id=sensor_1, ts=1607527992000, vc=20)
4> WaterSensor(id=sensor_1, ts=1607527993000, vc=40)
1> WaterSensor(id=sensor_1, ts=1607527994000, vc=50)
2> WaterSensor(id=sensor_1, ts=1607527994000, vc=80)
3> WaterSensor(id=sensor_1, ts=1607527994000, vc=70)

Process finished with exit code -1

 */
